home *** CD-ROM | disk | FTP | other *** search
- /* ----------------------------------------------------------------
- * FILE
- * hash.c
- *
- * DESCRIPTION
- * Routines to hash relations for hashjoin
- *
- * INTERFACE ROUTINES
- * ExecHash - generate an in-memory hash table of the relation
- * ExecInitHash - initialize node and subnodes..
- * ExecEndHash - shutdown node and subnodes
- *
- * NOTES
- *
- * IDENTIFICATION
- * $Header: /private/postgres/src/executor/RCS/n_hash.c,v 1.17 1992/08/19 01:10:30 mer Exp $
- * ----------------------------------------------------------------
- */
-
- #include <math.h>
- #include <sys/file.h>
- #include "storage/ipci.h"
- #include "storage/bufmgr.h" /* for BLCKSZ */
- #include "tcop/slaves.h"
- #include "executor/executor.h"
-
- RcsId("$Header: /private/postgres/src/executor/RCS/n_hash.c,v 1.17 1992/08/19 01:10:30 mer Exp $");
-
-
- extern int NBuffers;
- static int HashTBSize;
-
- /* ----------------------------------------------------------------
- * ExecHash
- *
- * build hash table for hashjoin, all do partitioning if more
- * than one batches are required.
- * ----------------------------------------------------------------
- */
- /**** xxref:
- * ExecProcNode
- ****/
- TupleTableSlot
- ExecHash(node)
- Hash node;
- {
- EState estate;
- HashState hashstate;
- Plan outerNode;
- Var hashkey;
- HashJoinTable hashtable;
- HeapTuple heapTuple;
- TupleTableSlot slot;
- ExprContext econtext;
-
- int nbatch;
- File *batches;
- RelativeAddr *batchPos;
- int *batchSizes;
- char *tempname;
- int i;
- RelativeAddr *innerbatchNames;
-
- /* ----------------
- * get state info from node
- * ----------------
- */
-
- hashstate = get_hashstate(node);
- estate = (EState) get_state((Plan) node);
- outerNode = get_outerPlan((Plan) node);
-
- if (!IsMaster && ParallelExecutorEnabled()) {
- IpcMemoryId shmid;
- IpcMemoryKey hashtablekey;
- int hashtablesize;
- int tmpfd;
-
- hashtablekey = get_hashtablekey(node);
- hashtablesize = get_hashtablesize(node);
-
- /* ----------------
- * in Sequent version, shared memory is implemented by
- * memory mapped files, it takes one file descriptor.
- * we may have to free one for this.
- * ----------------
- */
- closeOneVfd();
- shmid = IpcMemoryCreateWithoutOnExit(hashtablekey,
- hashtablesize,
- HASH_PERMISSION);
- hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
- set_hashtable(node, hashtable);
- }
-
- hashtable = get_hashtable(node);
- if (hashtable == NULL)
- elog(WARN, "ExecHash: hash table is NULL.");
-
- nbatch = hashtable->nbatch;
-
- if (nbatch > 0) { /* if needs hash partition */
- innerbatchNames = (RelativeAddr *) ABSADDR(hashtable->innerbatchNames);
-
- /* --------------
- * allocate space for the file descriptors of batch files
- * then open the batch files in the current processes.
- * --------------
- */
- batches = (File*)palloc(nbatch * sizeof(File));
- for (i=0; i<nbatch; i++) {
- batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
- O_CREAT | O_RDWR, 0600);
- }
- set_hashBatches(hashstate, batches);
- batchPos = (RelativeAddr*) ABSADDR(hashtable->innerbatchPos);
- batchSizes = (int*) ABSADDR(hashtable->innerbatchSizes);
- }
-
- /* ----------------
- * set expression context
- * ----------------
- */
- hashkey = get_hashkey(node);
- econtext = get_cs_ExprContext((CommonState) hashstate);
-
- /* ----------------
- * get tuple and insert into the hash table
- * ----------------
- */
- for (;;) {
- slot = ExecProcNode(outerNode);
- if (TupIsNull((Pointer)slot))
- break;
-
- set_ecxt_innertuple(econtext, slot);
- ExecHashTableInsert(hashtable, econtext, hashkey,
- get_hashBatches(hashstate));
-
- ExecClearTuple((Pointer) slot);
- }
-
- /*
- * end of build phase, flush all the last pages of the batches.
- */
- for (i=0; i<nbatch; i++) {
- if (FileSeek(batches[i], 0L, L_XTND) < 0)
- perror("FileSeek");
- if (FileWrite(batches[i],ABSADDR(hashtable->batch)+i*BLCKSZ,BLCKSZ) < 0)
- perror("FileWrite");
- NDirectFileWrite++;
- }
-
- /* ---------------------
- * Return the slot so that we have the tuple descriptor
- * when we need to save/restore them. -Jeff 11 July 1991
- * ---------------------
- */
- return slot;
- }
-
- /* ----------------------------------------------------------------
- * ExecInitHash
- *
- * Init routine for Hash node
- * ----------------------------------------------------------------
- */
- /**** xxref:
- * ExecInitNode
- ****/
- List
- ExecInitHash(node, estate, parent)
- Hash node;
- EState estate;
- Plan parent;
- {
- HashState hashstate;
- Plan outerPlan;
-
- SO1_printf("ExecInitHash: %s\n",
- "initializing hash node");
-
- /* ----------------
- * assign the node's execution state
- * ----------------
- */
- set_state((Plan) node, (EStatePtr)estate);
-
- /* ----------------
- * create state structure
- * ----------------
- */
- hashstate = MakeHashState(NULL);
- set_hashstate(node, hashstate);
- set_hashBatches(hashstate, NULL);
-
- /* ----------------
- * Miscellanious initialization
- *
- * + assign node's base_id
- * + assign debugging hooks and
- * + create expression context for node
- * ----------------
- */
- ExecAssignNodeBaseInfo(estate, (BaseNode) hashstate, parent);
- ExecAssignDebugHooks((Plan) node, (BaseNode) hashstate);
- ExecAssignExprContext(estate, (CommonState) hashstate);
-
- #define HASH_NSLOTS 1
- /* ----------------
- * initialize our result slot
- * ----------------
- */
- ExecInitResultTupleSlot(estate, (CommonState) hashstate);
-
- /* ----------------
- * initializes child nodes
- * ----------------
- */
- outerPlan = get_outerPlan((Plan) node);
- ExecInitNode(outerPlan, estate, (Plan) node);
-
- /* ----------------
- * initialize tuple type. no need to initialize projection
- * info because this node doesn't do projections
- * ----------------
- */
- ExecAssignResultTypeFromOuterPlan((Plan) node, (CommonState) hashstate);
- set_cs_ProjInfo((CommonState) hashstate, NULL);
-
- return
- LispTrue;
- }
-
- int
- ExecCountSlotsHash(node)
- Plan node;
- {
- return ExecCountSlotsNode(get_outerPlan(node)) +
- ExecCountSlotsNode(get_innerPlan(node)) +
- HASH_NSLOTS;
- }
-
- /* ---------------------------------------------------------------
- * ExecEndHash
- *
- * clean up routine for Hash node
- * ----------------------------------------------------------------
- */
-
- /**** xxref:
- * ExecEndNode
- ****/
- void
- ExecEndHash(node)
- Hash node;
- {
- HashState hashstate;
- Plan outerPlan;
- File *batches;
-
- /* ----------------
- * get info from the hash state
- * ----------------
- */
- hashstate = get_hashstate(node);
- batches = get_hashBatches(hashstate);
- if (batches != NULL)
- pfree(batches);
-
- /* ----------------
- * free projection info. no need to free result type info
- * because that came from the outer plan...
- * ----------------
- */
- ExecFreeProjectionInfo((CommonState) hashstate);
-
- /* ----------------
- * shut down the subplan
- * ----------------
- */
- outerPlan = get_outerPlan((Plan) node);
- ExecEndNode(outerPlan);
- }
-
- RelativeAddr
- hashTableAlloc(size, hashtable)
- int size;
- HashJoinTable hashtable;
- {
- RelativeAddr p;
- p = hashtable->top;
- hashtable->top += size;
- return p;
- }
-
- /* ----------------------------------------------------------------
- * ExecHashTableCreate
- *
- * create a hashtable in shared memory for hashjoin.
- * ----------------------------------------------------------------
- */
- #define NTUP_PER_BUCKET 10
- #define FUDGE_FAC 1.5
-
- HashJoinTable
- ExecHashTableCreate(node)
- Plan node;
- {
- Plan outerNode;
- int nbatch;
- int ntuples;
- int tupsize;
- IpcMemoryId shmid;
- HashJoinTable hashtable;
- HashBucket bucket;
- int nbuckets;
- int totalbuckets;
- int bucketsize;
- int i;
- int tmpfd;
- #ifdef sequent
- slock_t *batchLock;
- #endif
- RelativeAddr *outerbatchNames;
- RelativeAddr *outerbatchPos;
- RelativeAddr *innerbatchNames;
- RelativeAddr *innerbatchPos;
- int *innerbatchSizes;
- RelativeAddr tempname;
-
- nbatch = -1;
- HashTBSize = NBuffers/2;
- while (nbatch < 0) {
- /*
- * determine number of batches for the hashjoin
- */
- HashTBSize *= 2;
- nbatch = ExecHashPartition((Hash) node);
- }
- /* ----------------
- * get information about the size of the relation
- * ----------------
- */
- outerNode = get_outerPlan(node);
- ntuples = get_plan_size(outerNode);
- if (ntuples <= 0)
- ntuples = 1000; /* XXX just a hack */
- tupsize = get_plan_width(outerNode) + sizeof(HeapTupleData);
-
- /*
- * totalbuckets is the total number of hash buckets needed for
- * the entire relation
- */
- totalbuckets = ceil((double)ntuples/NTUP_PER_BUCKET);
- bucketsize = NTUP_PER_BUCKET * tupsize + sizeof(*bucket);
-
- /*
- * nbuckets is the number of hash buckets for the first pass
- * of hybrid hashjoin
- */
- nbuckets = (HashTBSize - nbatch) * BLCKSZ / (bucketsize * FUDGE_FAC);
- if (totalbuckets < nbuckets)
- totalbuckets = nbuckets;
- if (nbatch == 0)
- nbuckets = totalbuckets;
- #ifdef HJDEBUG
- printf("nbatch = %d, totalbuckets = %d, nbuckets = %d\n", nbatch, totalbuckets, nbuckets);
- #endif
-
- #ifdef sequent
- /* ----------------
- * in Sequent version, shared memory is implemented by
- * memory mapped files, it takes one file descriptor.
- * we may have to free one for this.
- * ----------------
- */
- closeOneVfd();
- if (ParallelExecutorEnabled()) {
- IpcMemoryKey hashtablekey;
- int hashtablesize;
- hashtablekey = get_hashtablekey((Hash)node);
- hashtablesize = (HashTBSize+SlaveLocalInfoD.nparallel)*BLCKSZ;
- shmid = IpcMemoryCreateWithoutOnExit(hashtablekey,
- hashtablesize,
- HASH_PERMISSION);
- set_hashtablesize((Hash)node, hashtablesize);
- }
- else
- shmid = IpcMemoryCreateWithoutOnExit(PrivateIPCKey,
- (HashTBSize+1)*BLCKSZ,HASH_PERMISSION);
- hashtable = (HashJoinTable) IpcMemoryAttach(shmid);
- #else /* sequent */
- /* ----------------
- * in non-parallel machines, we don't need to put the hash table
- * in the shared memory. We just palloc it.
- * ----------------
- */
- hashtable = (HashJoinTable)palloc((HashTBSize+1)*BLCKSZ);
- #endif /* sequent */
-
- if (hashtable == NULL) {
- elog(WARN, "not enough memory for hashjoin.");
- }
- /* ----------------
- * initialize the hash table header
- * ----------------
- */
- hashtable->nbuckets = nbuckets;
- hashtable->totalbuckets = totalbuckets;
- hashtable->bucketsize = bucketsize;
- hashtable->shmid = shmid;
- hashtable->top = sizeof(HashTableData);
- #ifdef sequent
- S_INIT_LOCK(&(hashtable->overflowlock));
- #endif
- hashtable->bottom = HashTBSize * BLCKSZ;
- /*
- * hashtable->readbuf has to be long aligned!!!
- */
- hashtable->readbuf = hashtable->bottom;
- hashtable->nbatch = nbatch;
- hashtable->curbatch = 0;
- hashtable->pcount = hashtable->nprocess = SlaveLocalInfoD.nparallel;
- #ifdef sequent
- S_INIT_BARRIER(&(hashtable->batchBarrier), hashtable->nprocess);
- S_INIT_LOCK(&(hashtable->tableLock));
- #endif
- if (nbatch > 0) {
- #ifdef sequent
- /* ---------------
- * allocate and initialize locks for each batch
- * ---------------
- */
- batchLock = (slock_t*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(slock_t), hashtable));
- for (i=0; i<nbatch; i++)
- S_INIT_LOCK(&(batchLock[i]));
- hashtable->batchLock = RELADDR(batchLock);
- #endif
- /* ---------------
- * allocate and initialize the outer batches
- * ---------------
- */
- outerbatchNames = (RelativeAddr*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
- outerbatchPos = (RelativeAddr*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
- for (i=0; i<nbatch; i++) {
- tempname = hashTableAlloc(12, hashtable);
- mk_hj_temp(ABSADDR(tempname));
- outerbatchNames[i] = tempname;
- outerbatchPos[i] = -1;
- }
- hashtable->outerbatchNames = RELADDR(outerbatchNames);
- hashtable->outerbatchPos = RELADDR(outerbatchPos);
- /* ---------------
- * allocate and initialize the inner batches
- * ---------------
- */
- innerbatchNames = (RelativeAddr*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
- innerbatchPos = (RelativeAddr*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable));
- innerbatchSizes = (int*)ABSADDR(
- hashTableAlloc(nbatch * sizeof(int), hashtable));
- for (i=0; i<nbatch; i++) {
- tempname = hashTableAlloc(12, hashtable);
- mk_hj_temp(ABSADDR(tempname));
- innerbatchNames[i] = tempname;
- innerbatchPos[i] = -1;
- innerbatchSizes[i] = 0;
- }
- hashtable->innerbatchNames = RELADDR(innerbatchNames);
- hashtable->innerbatchPos = RELADDR(innerbatchPos);
- hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
- }
- else {
- hashtable->outerbatchNames = (RelativeAddr)NULL;
- hashtable->outerbatchPos = (RelativeAddr)NULL;
- hashtable->innerbatchNames = (RelativeAddr)NULL;
- hashtable->innerbatchPos = (RelativeAddr)NULL;
- hashtable->innerbatchSizes = (RelativeAddr)NULL;
- }
-
- hashtable->batch = (RelativeAddr)LONGALIGN(hashtable->top +
- bucketsize * nbuckets);
- hashtable->overflownext=hashtable->batch + nbatch * BLCKSZ;
- /* ----------------
- * initialize each hash bucket
- * ----------------
- */
- bucket = (HashBucket)ABSADDR(hashtable->top);
- for (i=0; i<nbuckets; i++) {
- bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
- bucket->bottom = bucket->top;
- bucket->firstotuple = bucket->lastotuple = -1;
- #ifdef sequent
- S_INIT_LOCK(&(bucket->bucketlock));
- #endif
- bucket = (HashBucket)((char*)bucket + bucketsize);
- }
- return(hashtable);
- }
-
- /* ----------------------------------------------------------------
- * ExecHashTableInsert
- *
- * insert a tuple into the hash table depending on the hash value
- * it may just go to a tmp file for other batches
- * ----------------------------------------------------------------
- */
- void
- ExecHashTableInsert(hashtable, econtext, hashkey, batches)
- HashJoinTable hashtable;
- ExprContext econtext;
- Var hashkey;
- File *batches;
- {
- TupleTableSlot slot;
- HeapTuple heapTuple;
- Const hashvalue;
- HashBucket bucket;
- int bucketno;
- int nbatch;
- int batchno;
- char *buffer;
- RelativeAddr *batchPos;
- int *batchSizes;
- #ifdef sequent
- slock_t *batchLock;
- #endif
- char *pos;
-
- nbatch = hashtable->nbatch;
- batchPos = (RelativeAddr*)ABSADDR(hashtable->innerbatchPos);
- batchSizes = (int*)ABSADDR(hashtable->innerbatchSizes);
- #ifdef sequent
- batchLock = (slock_t*)ABSADDR(hashtable->batchLock);
- #endif
-
- slot = get_ecxt_innertuple(econtext);
- heapTuple = (HeapTuple) CAR(slot);
-
- #ifdef HJDEBUG
- printf("Inserting ");
- #endif
-
- bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
-
- /* ----------------
- * decide whether to put the tuple in the hash table or a tmp file
- * ----------------
- */
- if (bucketno < hashtable->nbuckets) {
- /* ---------------
- * put the tuple in hash table
- * ---------------
- */
- bucket = (HashBucket)
- (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize);
- #ifdef sequent
- /* --------------
- * lock the hash bucket
- * ---------------
- */
- if (ParallelExecutorEnabled())
- S_LOCK(&(bucket->bucketlock));
- #endif
- if ((char*)LONGALIGN(ABSADDR(bucket->bottom))
- -(char*)bucket+heapTuple->t_len > hashtable->bucketsize)
- ExecHashOverflowInsert(hashtable, bucket, heapTuple);
- else {
- bcopy(heapTuple,(char*)LONGALIGN(ABSADDR(bucket->bottom)),
- heapTuple->t_len);
- bucket->bottom =
- ((RelativeAddr)LONGALIGN(bucket->bottom) + heapTuple->t_len);
- }
- #ifdef sequent
- /* --------------
- * insertion done, unlock the hash bucket
- * --------------
- */
- if (ParallelExecutorEnabled())
- S_UNLOCK(&(bucket->bucketlock));
- #endif
- }
- else {
- /* -----------------
- * put the tuple into a tmp file for other batches
- * -----------------
- */
- batchno = (float)(bucketno - hashtable->nbuckets)/
- (float)(hashtable->totalbuckets - hashtable->nbuckets)
- * nbatch;
- buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ;
- #ifdef sequent
- /* ----------------
- * lock the batch
- * ----------------
- */
- if (ParallelExecutorEnabled())
- S_LOCK(&(batchLock[batchno]));
- #endif
- batchSizes[batchno]++;
- pos= (char *)
- ExecHashJoinSaveTuple(heapTuple,
- buffer,
- batches[batchno],
- ABSADDR(batchPos[batchno]));
- batchPos[batchno] = RELADDR(pos);
- #ifdef sequent
- /* -----------------
- * unlock the batch
- * -----------------
- */
- if (ParallelExecutorEnabled())
- S_UNLOCK(&(batchLock[batchno]));
- #endif
- }
- }
-
- /* ----------------------------------------------------------------
- * ExecHashTableDestroy
- *
- * destroy a hash table
- * ----------------------------------------------------------------
- */
- void
- ExecHashTableDestroy(hashtable)
- HashJoinTable hashtable;
- {
- #ifdef sequent
- IPCPrivateMemoryKill(0, hashtable->shmid);
- #else /* sequent */
- pfree((Pointer)hashtable);
- #endif /* sequent */
- }
-
- /* ----------------------------------------------------------------
- * ExecHashGetBucket
- *
- * Get the hash value for a tuple
- * ----------------------------------------------------------------
- */
- int
- ExecHashGetBucket(hashtable, econtext, hashkey)
- HashJoinTable hashtable;
- ExprContext econtext;
- Var hashkey;
- {
- int bucketno;
- HashBucket bucket;
- Const hashvalue;
- Datum keyval;
- extern Boolean execConstByVal;
- extern int execConstLen;
- Boolean isNull;
-
-
- /* ----------------
- * Get the join attribute value of the tuple
- * ----------------
- */
- keyval = ExecEvalVar(hashkey, econtext, &isNull);
-
- /* ------------------
- * compute the hash function
- * ------------------
- */
- if (execConstByVal)
- bucketno =
- hashFunc((char *) &keyval, execConstLen) % hashtable->totalbuckets;
- else
- bucketno =
- hashFunc((char *) keyval, execConstLen) % hashtable->totalbuckets;
- #ifdef HJDEBUG
- if (bucketno >= hashtable->nbuckets)
- printf("hash(%d) = %d SAVED\n", keyval, bucketno);
- else
- printf("hash(%d) = %d\n", keyval, bucketno);
- #endif
-
- return(bucketno);
- }
-
- /* ----------------------------------------------------------------
- * ExecHashOverflowInsert
- *
- * insert into the overflow area of a hash bucket
- * ----------------------------------------------------------------
- */
- void
- ExecHashOverflowInsert(hashtable, bucket, heapTuple)
- HashJoinTable hashtable;
- HashBucket bucket;
- HeapTuple heapTuple;
- {
- OverflowTuple otuple;
- RelativeAddr newend;
- OverflowTuple firstotuple;
- OverflowTuple lastotuple;
-
- firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
- lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
- #ifdef sequent
- if (ParallelExecutorEnabled()) {
- /* -----------------
- * Here we assume that the current bucket lock has already
- * been acquried. But we still need to lock the overflow lock.
- * This could be a potential bottleneck.
- * -----------------
- */
- S_LOCK(&(hashtable->overflowlock));
- }
- #endif
- /* ----------------
- * see if we run out of overflow space
- * ----------------
- */
- newend = (RelativeAddr)LONGALIGN(hashtable->overflownext + sizeof(*otuple)
- + heapTuple->t_len);
- if (newend > hashtable->bottom) {
- #ifdef sequent
- elog(WARN, "hash table out of memory.");
- #else
- elog(DEBUG, "hash table out of memory. expanding.");
- /* ------------------
- * XXX this is a temporary hack
- * eventually, recursive hash partitioning will be
- * implemented
- * ------------------
- */
- hashtable->readbuf = hashtable->bottom = 2 * hashtable->bottom;
- hashtable = (HashJoinTable)repalloc(hashtable, hashtable->bottom+BLCKSZ);
- if (hashtable == NULL) {
- perror("repalloc");
- elog(WARN, "can't expand hashtable.");
- }
- #endif
- }
-
- /* ----------------
- * establish the overflow chain
- * ----------------
- */
- otuple = (OverflowTuple)ABSADDR(hashtable->overflownext);
- hashtable->overflownext = newend;
- #ifdef sequent
- /* ------------------
- * unlock the overflow chain
- * ------------------
- */
- if (ParallelExecutorEnabled())
- S_UNLOCK(&(hashtable->overflowlock));
- #endif
- if (firstotuple == NULL)
- bucket->firstotuple = bucket->lastotuple = RELADDR(otuple);
- else {
- lastotuple->next = RELADDR(otuple);
- bucket->lastotuple = RELADDR(otuple);
- }
-
- /* ----------------
- * copy the tuple into the overflow area
- * ----------------
- */
- otuple->next = -1;
- otuple->tuple = RELADDR(LONGALIGN(((char*)otuple + sizeof(*otuple))));
- bcopy(heapTuple, ABSADDR(otuple->tuple), heapTuple->t_len);
- }
-
- /* ----------------------------------------------------------------
- * ExecScanHashBucket
- *
- * scan a hash bucket of matches
- * ----------------------------------------------------------------
- */
- HeapTuple
- ExecScanHashBucket(hjstate, bucket, curtuple, hjclauses, econtext)
- HashJoinState hjstate;
- HashBucket bucket;
- HeapTuple curtuple;
- List hjclauses;
- ExprContext econtext;
- {
- HeapTuple heapTuple;
- bool qualResult;
- OverflowTuple otuple = NULL;
- OverflowTuple curotuple;
- TupleTableSlot inntuple;
- OverflowTuple firstotuple;
- OverflowTuple lastotuple;
- HashJoinTable hashtable;
-
- hashtable = get_hj_HashTable(hjstate);
- firstotuple = (OverflowTuple)ABSADDR(bucket->firstotuple);
- lastotuple = (OverflowTuple)ABSADDR(bucket->lastotuple);
-
- /* ----------------
- * search the hash bucket
- * ----------------
- */
- if (curtuple == NULL || curtuple < (HeapTuple)ABSADDR(bucket->bottom)) {
- if (curtuple == NULL)
- heapTuple = (HeapTuple)
- LONGALIGN(ABSADDR(bucket->top));
- else
- heapTuple = (HeapTuple)
- LONGALIGN(((char*)curtuple+curtuple->t_len));
-
- while (heapTuple < (HeapTuple)ABSADDR(bucket->bottom)) {
-
- inntuple = (TupleTableSlot)
- ExecStoreTuple((Pointer) heapTuple, /* tuple to store */
- (Pointer) get_hj_HashTupleSlot(hjstate),
- /* slot */
- InvalidBuffer, /* tuple has no buffer */
- false); /* do not pfree this tuple */
-
- set_ecxt_innertuple(econtext, inntuple);
- qualResult = ExecQual(hjclauses, econtext);
-
- if (qualResult)
- return heapTuple;
-
- heapTuple = (HeapTuple)
- LONGALIGN(((char*)heapTuple+heapTuple->t_len));
- }
-
- if (firstotuple == NULL)
- return NULL;
- otuple = firstotuple;
- }
-
- /* ----------------
- * search the overflow area of the hash bucket
- * ----------------
- */
- if (otuple == NULL) {
- curotuple = get_hj_CurOTuple(hjstate);
- otuple = (OverflowTuple)ABSADDR(curotuple->next);
- }
-
- while (otuple != NULL) {
- heapTuple = (HeapTuple)ABSADDR(otuple->tuple);
-
- inntuple = (TupleTableSlot)
- ExecStoreTuple((Pointer) heapTuple, /* tuple to store */
- (Pointer) get_hj_HashTupleSlot(hjstate), /* slot */
- InvalidBuffer, /* SP?? this tuple has no buffer */
- false); /* do not pfree this tuple */
-
- set_ecxt_innertuple(econtext, inntuple);
- qualResult = ExecQual(hjclauses, econtext);
-
- if (qualResult) {
- set_hj_CurOTuple(hjstate, otuple);
- return heapTuple;
- }
-
- otuple = (OverflowTuple)ABSADDR(otuple->next);
- }
-
- /* ----------------
- * no match
- * ----------------
- */
- return NULL;
- }
-
- /* ----------------------------------------------------------------
- * hashFunc
- *
- * the hash function, copied from Margo
- * ----------------------------------------------------------------
- */
-
- int
- hashFunc(key,len)
- char *key;
- int len;
- {
- register unsigned int h;
- register int l;
- register unsigned char *k;
-
- /*
- * If this is a variable length type, then 'k' points
- * to a "struct varlena" and len == -1.
- * NOTE:
- * VARSIZE returns the "real" data length plus the sizeof the
- * "vl_len" attribute of varlena (the length information).
- * 'k' points to the beginning of the varlena struct, so
- * we have to use "VARDATA" to find the beginning of the "real"
- * data.
- */
- if (len == -1) {
- l = VARSIZE(key) - sizeof(long); /* 'varlena.vl_len' is a long*/
- k = (unsigned char*) VARDATA(key);
- } else {
- l = len;
- k = (unsigned char *) key;
- }
-
- h = 0;
-
- /*
- * Convert string to integer
- */
- while (l--) h = h * PRIME1 ^ (*k++);
- h %= PRIME2;
-
- return (h);
- }
-
- /* ----------------------------------------------------------------
- * ExecHashPartition
- *
- * determine the number of batches needed for a hashjoin
- * ----------------------------------------------------------------
- */
-
- int
- ExecHashPartition(node)
- Hash node;
- {
- Plan outerNode;
- int b;
- int pages;
- int ntuples;
- int tupsize;
-
- /*
- * get size information for plan node
- */
- outerNode = get_outerPlan((Plan) node);
- ntuples = get_plan_size(outerNode);
- if (ntuples == 0) ntuples = 1000;
- tupsize = get_plan_width(outerNode) + sizeof(HeapTupleData);
- pages = ceil((double)ntuples * tupsize * FUDGE_FAC / BLCKSZ);
-
- /*
- * if amount of buffer space below hashjoin threshold,
- * return negative
- */
- if (ceil(sqrt((double)pages)) > HashTBSize)
- return -1;
- if (pages <= HashTBSize)
- b = 0; /* fit in memory, no partitioning */
- else
- b = ceil((double)(pages - HashTBSize)/(double)(HashTBSize - 1));
-
- return b;
- }
-
- /* ----------------------------------------------------------------
- * ExecHashTableReset
- *
- * reset hash table header for new batch
- * ----------------------------------------------------------------
- */
-
- void
- ExecHashTableReset(hashtable, ntuples)
- HashJoinTable hashtable;
- int ntuples;
- {
- int i;
- HashBucket bucket;
-
- hashtable->nbuckets = hashtable->totalbuckets
- = ceil((double)ntuples/NTUP_PER_BUCKET);
-
- hashtable->overflownext = hashtable->top + hashtable->bucketsize *
- hashtable->nbuckets;
-
- bucket = (HashBucket)ABSADDR(hashtable->top);
- for (i=0; i<hashtable->nbuckets; i++) {
- bucket->top = RELADDR((char*)bucket + sizeof(*bucket));
- bucket->bottom = bucket->top;
- bucket->firstotuple = bucket->lastotuple = -1;
- #ifdef sequent
- S_INIT_LOCK(&(bucket->bucketlock));
- #endif
- bucket = (HashBucket)((char*)bucket + hashtable->bucketsize);
- }
- hashtable->pcount = hashtable->nprocess;
- }
-
- static int hjtmpcnt = 0;
-
- void
- mk_hj_temp(tempname)
- char *tempname;
- {
- sprintf(tempname, "HJ%d.%d", getpid(), hjtmpcnt);
- hjtmpcnt = (hjtmpcnt + 1) % 1000;
- }
-
- print_buckets(hashtable)
- HashJoinTable hashtable;
- {
- int i;
- HashBucket bucket;
- bucket = (HashBucket)ABSADDR(hashtable->top);
- for (i=0; i<hashtable->nbuckets; i++) {
- printf("bucket%d = (top = %d, bottom = %d, firstotuple = %d, lastotuple = %d)\n", i, bucket->top, bucket->bottom, bucket->firstotuple, bucket->lastotuple);
- bucket = (HashBucket)((char*)bucket + hashtable->bucketsize);
- }
- }
-